#include <sys/poll.h>
#include <errno.h>
#include <sys/ioctl.h>
#include <getopt.h>
#include <unistd.h>
#include <sys/mman.h>

#define DBG_SUBSYS S_LIBYNET

#include "ynet_rpc.h"
#include "../../ynet/sock/sock_tcp.h"
#include "dbg.h"

//#define MAX_BUF_LEN 65536

#define TEST_NET_PORT  "12341"
#define MSG_SIZE 64
#if 0
#define USE_SPLICE
#endif

#if 0
#define USE_EPOLL
#endif

typedef enum {
        OP_SRV,
        OP_CLI,
} op_t;

typedef struct {
        int sd;
} session_t;

#ifdef USE_SPLICE

inline static int __splice_in(int sd, int *_pipe, char *buf, int size)
{
        int ret;
        struct iovec iov;

#if 0
        addr = mmap(buf, size, PROT_READ, MAP_PRIVATE | MAP_ANON, 0, 0);
        YASSERT(addr);
#endif

        ret = splice(sd, NULL, _pipe[1], NULL, size, SPLICE_F_MOVE);
        if (ret == -1) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        iov.iov_base = buf;
        iov.iov_len = size;

        ret = vmsplice(_pipe[0], &iov, 1, SPLICE_F_MOVE);
        if (ret < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        return ret;
err_ret:
        return -ret;
}

inline static int __splice_out(int sd, int *_pipe, char *buf, int size)
{
        int ret;
        struct iovec iov;

        iov.iov_base = buf;
        iov.iov_len = size;

        ret = vmsplice(_pipe[1], &iov, 1, SPLICE_F_MOVE);
        if (ret < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        ret = splice(_pipe[0], NULL, sd, NULL, size, SPLICE_F_MOVE);
        if (ret == -1) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        return ret;
err_ret:
        return -ret;
}

#endif

static void *__test_srv_wroker(void *arg)
{
        int ret, pipe_in[2], pipe_out[2];
        session_t *session = arg;
        //char in[MAX_BUF_LEN], out[MAX_BUF_LEN];
        char *in, *out;

#if 1
        ret = posix_memalign((void **)&in, 4096, MAX_BUF_LEN);
        if (ret < 0) {
                ret = ENOMEM;
                UNIMPLEMENTED(__DUMP__);
        }

        ret = posix_memalign((void **)&out, 4096, MAX_BUF_LEN);
        if (ret < 0) {
                ret = ENOMEM;
                UNIMPLEMENTED(__DUMP__);
        }
#endif

        ret = pipe(pipe_in);
        if (ret < 0) {
                ret = errno;
                UNIMPLEMENTED(__DUMP__);
        }

        ret = pipe(pipe_out);
        if (ret < 0) {
                ret = errno;
                UNIMPLEMENTED(__DUMP__);
        }

#ifdef USE_EPOLL
        int epollfd;
        struct epoll_event event, ev;

        epollfd = epoll_create(1);
        if (epollfd < 0 ) {
                ret = errno;
                UNIMPLEMENTED(__DUMP__);
        }

        event.data.fd = session->sd;
        event.events = EPOLLIN;
        ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, session->sd, &event);
        if (ret < 0) {
                ret = errno;
                UNIMPLEMENTED(__DUMP__);
        }

#else
        struct pollfd pfd;
        pfd.fd = session->sd;
        pfd.events = POLLIN;
        pfd.revents = 0;
#endif

        while (1) {
#ifdef USE_EPOLL
                ret = epoll_wait(epollfd, &ev, 1, -1);
                if (ret < 0) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }

                YASSERT(ret == 1);

                DBUG("epoll ret %u\n", ev.data.fd);
#else
                ret = poll(&pfd, 1, -1);
                if (ret == -1) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }
#endif

#ifndef USE_SPLICE
                ret = read(session->sd, in, MSG_SIZE);
                if (ret == -1) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }

#else
                ret = __splice_in(session->sd, pipe_in, in, MSG_SIZE);
                if (ret < 0) {
                        ret = -ret;
                        GOTO(err_ret, ret);
                }
#endif

                if (ret == 0) {
                        ret = ECONNRESET;
                        GOTO(err_ret, ret);
                }

#ifndef USE_SPLICE
                ret = write(session->sd, out, MSG_SIZE);
                if (ret == -1) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }
#else
                ret = __splice_out(session->sd, pipe_out, out, MSG_SIZE);
                if (ret < 0) {
                        ret = -ret;
                        GOTO(err_ret, ret);
                }                
#endif
        }

        return NULL;
err_ret:
        close(pipe_in[0]);
        close(pipe_in[1]);
        close(pipe_out[1]);
        close(pipe_out[0]);
        yfree((void **)&in);
        yfree((void **)&out);
        close(session->sd);
        yfree((void **)&session);
        return NULL;
}

static int __test_srv_accept(int srv_sd)
{
        int ret, sd;
        struct sockaddr_in sin;
        socklen_t alen;
        pthread_t th;
        pthread_attr_t ta;
        session_t *session;

        _memset(&sin, 0, sizeof(sin));
        alen = sizeof(struct sockaddr_in);

        sd = accept(srv_sd, &sin, &alen);
        if (sd < 0 ) {
                ret = errno;
		GOTO(err_ret, ret);
        }

        ret = tcp_sock_tuning(sd, 1, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("accept new sd %u\n", sd);

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        ret = ymalloc((void **)&session, sizeof(*session));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        session->sd = sd;

        ret = pthread_create(&th, &ta, __test_srv_wroker, session);
        if (unlikely(ret))
                GOTO(err_sd, ret);

        return 0;
err_sd:
        (void) close(sd);
err_ret:
        return ret;
}

static int __test_srv()
{
        int ret;
        int listen_sd;

        //ret = net_hostconnect(&proxysd, host, "10090", YNET_RPC_BLOCK);
        ret = tcp_sock_hostlisten(&listen_sd, NULL, TEST_NET_PORT,
                                  YNET_QLEN, YNET_RPC_BLOCK, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        while (1) {
                ret = __test_srv_accept(listen_sd);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}


typedef struct {
        sem_t sem;
        uint64_t io;
        int sd;
        int sendsize;
        int recvsize;
} prof_net_arg_t;


static void *__test_cli_worker(void *_arg)
{
        int ret, pipe_in[2], pipe_out[2];
        prof_net_arg_t *arg = _arg;
        char *in = NULL, *out = NULL;

#if 1
        ret = posix_memalign((void **)&in, 4096, MAX_BUF_LEN);
        if (ret < 0) {
                ret = ENOMEM;
                UNIMPLEMENTED(__DUMP__);
        }

        ret = posix_memalign((void **)&out, 4096, MAX_BUF_LEN);
        if (ret < 0) {
                ret = ENOMEM;
                UNIMPLEMENTED(__DUMP__);
        }
#endif

        ret = pipe(pipe_in);
        if (ret < 0) {
                ret = errno;
                UNIMPLEMENTED(__DUMP__);
        }

        ret = pipe(pipe_out);
        if (ret < 0) {
                ret = errno;
                UNIMPLEMENTED(__DUMP__);
        }

        while (1) {
                strcpy(in, "xxxxxxxxx");
#ifndef USE_SPLICE 
                ret = send(arg->sd, in, MSG_SIZE, MSG_DONTWAIT);
                if (ret < 0) {
                        ret = errno;
                        UNIMPLEMENTED(__DUMP__);
                }
#else
                ret = __splice_out(arg->sd, pipe_out, out, MSG_SIZE);
                if (ret < 0) {
                        ret = -ret;
                        UNIMPLEMENTED(__DUMP__);
                }
#endif

                YASSERT(ret == MSG_SIZE);
#ifndef USE_SPLICE
                ret = recv(arg->sd, out, MSG_SIZE, 0);
                if (ret < 0) {
                        ret = errno;
                        UNIMPLEMENTED(__DUMP__);
                }
#else
                ret = __splice_in(arg->sd, pipe_out, out, MSG_SIZE);
                if (ret < 0) {
                        ret = -ret;
                        UNIMPLEMENTED(__DUMP__);
                }
#endif

                YASSERT(ret == MSG_SIZE);

                arg->io++;
        }

        return NULL;
}

static void __test_cli_print(prof_net_arg_t *args, int threads, struct timeval *begin)
{
        int i;
        uint64_t io, used;
        struct timeval now;

        io = 0;
        for (i = 0; i < threads; i++) {
                io += args[i].io;
        }

        _gettimeofday(&now, NULL);

        used = _time_used(begin, &now);

        DBUG("io %u, used %u\n", (int)io, (int)used);
        printf("iops : %llu\n", (long long)io * 1000 * 1000 /  used);
}


static int __test_cli(const char *srv, int threads, int runtime)
{
        int ret, i;
        net_handle_t nh;
        struct timeval begin;
        prof_net_arg_t args[1024], *arg;
        pthread_t th;
        pthread_attr_t ta;

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        _gettimeofday(&begin, NULL);

        for (i = 0; i < threads; i++) {
                arg = &args[i];
                arg->io = 0;

                ret = tcp_sock_hostconnect(&nh, srv, TEST_NET_PORT, 0, 10, 0);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                arg->sd = nh.u.sd.sd;

                ret = tcp_sock_tuning(arg->sd, 1, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = pthread_create(&th, &ta, __test_cli_worker, arg);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        DWARN("runtime %u\n", runtime);

        //end = gettime();
        i = 0;
        while (i < runtime) {
                sleep(1);
                i++;
                __test_cli_print(args, threads, &begin);
        }

        return 0;
err_ret:
        return ret;
}

int main(int argc, char *argv[])
{
        int ret, op = 0;
        char c_opt;
        const char *server = NULL;
        const char *sendsize = "0", *recvsize = "0", *thread = "1", *runtime = "1048576000";

        (void) sendsize;
        (void) recvsize;
        (void) runtime;

        while (1) {
                int option_index = 0;

                static struct option long_options[] = {
                        { "sendsize", required_argument, 0, 0},
                        { "recvsize", required_argument, 0, 0},
                        { "thread", required_argument, 0, 0},
                        { "runtime", required_argument, 0, 0},
                        { "srv", no_argument, 0, 's'},
                        { "cli", required_argument, 0, 'c'},
                        { "verbose", 0, 0, 'v' },
                        { "help",    0, 0, 'h' },
                        { 0, 0, 0, 0 },
                };

                c_opt = getopt_long(argc, argv, "c:s", long_options, &option_index);
                if (c_opt == -1)
                        break;

                switch (c_opt) {
                case 0:
                        switch (option_index) {
                        case 0:
                                sendsize = optarg;
                                break;
                        case 1:
                                recvsize = optarg;
                                break;
                        case 2:
                                thread = optarg;
                                break;
                        case 3:
                                runtime = optarg;
                                break;
                        default:
                                fprintf(stderr, "Hoops, wrong op got!\n");
                                YASSERT(0); 
                        }

                        break;
                case 'c':
                        op = OP_CLI;
                        server = optarg;
                        DWARN("srv %s\n", server);
                        break;
                case 's':
                        op = OP_SRV;
                        break;
                default:
                        fprintf(stderr, "Hoops, wrong op got!\n");
                        ret = EINVAL;
                        GOTO(err_ret, ret);
                }
        }

        switch (op) {
        case OP_SRV:
                ret = __test_srv();
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                break;
        case OP_CLI:
                ret = __test_cli(server, atoi(thread), atoi(runtime));
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                break;
        default:
                UNIMPLEMENTED(__DUMP__);
        }

        return 0;
err_ret:
        return ret;
}
